package com.pdd.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @author:liyangpeng
 * @date:2020/4/21 15:41
 */
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP1)
    public void topic_test1(ConsumerRecord record, Acknowledgment akc, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        Optional message=Optional.ofNullable(record);
        if(message.isPresent()){
            Object msg=message.get();
            log.info("[kafka-消息消费]已成功消费:{},topic:{}",msg,topic);
            akc.acknowledge();
        }
    }

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP2)
    public void topic_test2(ConsumerRecord record, Acknowledgment akc,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        Optional message=Optional.ofNullable(record);
        if(message.isPresent()){
            log.info("[kafka-消息消费]已成功消费:{},topic:{}",record,topic);
            akc.acknowledge();
        }
    }
}
